package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"github.com/gogf/gf/util/gconv"
	"os"
	"os/signal"
)

func main() {
	// 设置Kafka broker地址
	brokerList := []string{"localhost:9092"}

	// 配置消费者
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true

	// 创建消费者
	// Notice 一般都是自己实现 sarama.ConsumerGroupHandler 这个interface，加上自己业务的逻辑～
	consumer, err := sarama.NewConsumer(brokerList, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// 指定要消费的Topic
	topic := "my-topic"
	partition := int32(2) // 指定要消费的分区

	// Notice 根据指定的Topic和分区创建分区消费者～
	// Notice 实际上会定义自己的 Consumer结构体
	consumerPartition, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}
	defer consumerPartition.Close()

	// 监听信号，用于优雅地停止消费者
	// Notice 防止 goroutine泄漏！
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt, os.Kill)

	// 消费消息
	for {
		select {
		case msg := <-consumerPartition.Messages():
			fmt.Printf("Received message! partition: %v, offset: %v key: %v, value: %v: %v \n", msg.Partition, msg.Offset, gconv.String(msg.Key), gconv.String(msg.Value))
		case err := <-consumerPartition.Errors():
			fmt.Println("Error:", err)
		case <-signals:
			fmt.Println("Received interrupt, shutting down consumer...")
			return
		}
	}
}
